"""The tests for the Home Assistant HTTP component."""

from http import HTTPStatus
from ipaddress import ip_address
import logging
import os
from unittest.mock import AsyncMock, Mock, mock_open, patch

from aiohttp import web
from aiohttp.web_exceptions import HTTPUnauthorized
from aiohttp.web_middlewares import middleware
import pytest

from homeassistant.components import http
from homeassistant.components.http.ban import (
    IP_BANS_FILE,
    KEY_BAN_MANAGER,
    KEY_FAILED_LOGIN_ATTEMPTS,
    process_success_login,
    setup_bans,
)
from homeassistant.components.http.view import request_handler_factory
from homeassistant.core import HomeAssistant
from homeassistant.exceptions import HomeAssistantError
from homeassistant.helpers.http import KEY_AUTHENTICATED, KEY_HASS
from homeassistant.setup import async_setup_component

from tests.common import async_get_persistent_notifications
from tests.test_util import mock_real_ip
from tests.typing import ClientSessionGenerator

SUPERVISOR_IP = "1.2.3.4"
BANNED_IPS = ["200.201.202.203", "100.64.0.2"]
BANNED_IPS_WITH_SUPERVISOR = [*BANNED_IPS, SUPERVISOR_IP]


@pytest.fixture(name="hassio_env")
def hassio_env_fixture(supervisor_is_connected: AsyncMock):
    """Fixture to inject hassio env."""
    with (
        patch.dict(os.environ, {"SUPERVISOR": "127.0.0.1"}),
        patch.dict(os.environ, {"SUPERVISOR_TOKEN": "123456"}),
    ):
        yield


@pytest.fixture(autouse=True)
def gethostbyaddr_mock():
    """Fixture to mock out I/O on getting host by address."""
    with patch(
        "homeassistant.components.http.ban.gethostbyaddr",
        return_value=("example.com", ["0.0.0.0.in-addr.arpa"], ["0.0.0.0"]),
    ):
        yield


async def test_access_from_banned_ip(
    hass: HomeAssistant, aiohttp_client: ClientSessionGenerator
) -> None:
    """Test accessing to server from banned IP. Both trusted and not."""
    app = web.Application()
    app[KEY_HASS] = hass
    setup_bans(hass, app, 5)
    set_real_ip = mock_real_ip(app)

    with patch(
        "homeassistant.components.http.ban.load_yaml_config_file",
        return_value={
            banned_ip: {"banned_at": "2016-11-16T19:20:03"} for banned_ip in BANNED_IPS
        },
    ):
        client = await aiohttp_client(app)

    for remote_addr in BANNED_IPS:
        set_real_ip(remote_addr)
        resp = await client.get("/")
        assert resp.status == HTTPStatus.FORBIDDEN


async def test_access_from_banned_ip_with_partially_broken_yaml_file(
    hass: HomeAssistant,
    aiohttp_client: ClientSessionGenerator,
    caplog: pytest.LogCaptureFixture,
) -> None:
    """Test accessing to server from banned IP. Both trusted and not.

    We inject some garbage into the yaml file to make sure it can
    still load the bans.
    """
    app = web.Application()
    app[KEY_HASS] = hass
    setup_bans(hass, app, 5)
    set_real_ip = mock_real_ip(app)

    data = {banned_ip: {"banned_at": "2016-11-16T19:20:03"} for banned_ip in BANNED_IPS}
    data["5.3.3.3"] = {"banned_at": "garbage"}

    with patch(
        "homeassistant.components.http.ban.load_yaml_config_file",
        return_value=data,
    ):
        client = await aiohttp_client(app)

    for remote_addr in BANNED_IPS:
        set_real_ip(remote_addr)
        resp = await client.get("/")
        assert resp.status == HTTPStatus.FORBIDDEN

    # Ensure garbage data is ignored
    set_real_ip("5.3.3.3")
    resp = await client.get("/")
    assert resp.status == HTTPStatus.NOT_FOUND

    assert "Failed to load IP ban" in caplog.text


async def test_access_from_banned_ip_with_invalid_ip_entry(
    hass: HomeAssistant,
    aiohttp_client: ClientSessionGenerator,
    caplog: pytest.LogCaptureFixture,
) -> None:
    """Test that invalid IP addresses in ban file are skipped gracefully.

    An invalid IP entry (e.g., with typo like "Eo128.199.160.243") should
    be logged as an error and skipped, allowing valid bans to still load.
    The test ensures that valid IPs after invalid ones are still processed.
    """
    app = web.Application()
    app[KEY_HASS] = hass
    setup_bans(hass, app, 5)
    set_real_ip = mock_real_ip(app)

    # Invalid IPs interspersed between valid ones to ensure continue works
    data = {
        "Eo128.199.160.243": {"banned_at": "2024-07-06T14:07:46"},
        BANNED_IPS[0]: {"banned_at": "2016-11-16T19:20:03"},
        "invalidip": {"banned_at": "2024-07-06T14:07:46"},
        BANNED_IPS[1]: {"banned_at": "2016-11-16T19:20:03"},
    }

    with patch(
        "homeassistant.components.http.ban.load_yaml_config_file",
        return_value=data,
    ):
        client = await aiohttp_client(app)

    # Verify exactly 2 valid IPs were loaded (invalid ones skipped)
    manager = app[KEY_BAN_MANAGER]
    assert len(manager.ip_bans_lookup) == len(BANNED_IPS)

    # Valid banned IPs should still be blocked (even though they came after invalid ones)
    for remote_addr in BANNED_IPS:
        set_real_ip(remote_addr)
        resp = await client.get("/")
        assert resp.status == HTTPStatus.FORBIDDEN

    # Non-banned IP should have access
    set_real_ip("192.168.1.1")
    resp = await client.get("/")
    assert resp.status == HTTPStatus.NOT_FOUND

    # Check that both invalid IP entries were logged
    for ip in ("Eo128.199.160.243", "invalidip"):
        assert (
            "homeassistant.components.http.ban",
            logging.ERROR,
            f"Failed to load IP ban: invalid IP address {ip}",
        ) in caplog.record_tuples


async def test_no_ip_bans_file(
    hass: HomeAssistant, aiohttp_client: ClientSessionGenerator
) -> None:
    """Test no ip bans file."""
    app = web.Application()
    app[KEY_HASS] = hass
    setup_bans(hass, app, 5)
    set_real_ip = mock_real_ip(app)

    with patch(
        "homeassistant.components.http.ban.load_yaml_config_file",
        side_effect=FileNotFoundError,
    ):
        client = await aiohttp_client(app)

    set_real_ip("4.3.2.1")
    resp = await client.get("/")
    assert resp.status == HTTPStatus.NOT_FOUND


async def test_failure_loading_ip_bans_file(
    hass: HomeAssistant, aiohttp_client: ClientSessionGenerator
) -> None:
    """Test failure loading ip bans file."""
    app = web.Application()
    app[KEY_HASS] = hass
    setup_bans(hass, app, 5)
    set_real_ip = mock_real_ip(app)

    with patch(
        "homeassistant.components.http.ban.load_yaml_config_file",
        side_effect=HomeAssistantError,
    ):
        client = await aiohttp_client(app)

    set_real_ip("4.3.2.1")
    resp = await client.get("/")
    assert resp.status == HTTPStatus.NOT_FOUND


async def test_ip_ban_manager_never_started(
    hass: HomeAssistant,
    aiohttp_client: ClientSessionGenerator,
    caplog: pytest.LogCaptureFixture,
) -> None:
    """Test we handle the ip ban manager not being started."""
    app = web.Application()
    app[KEY_HASS] = hass
    setup_bans(hass, app, 5)
    set_real_ip = mock_real_ip(app)

    with patch(
        "homeassistant.components.http.ban.load_yaml_config_file",
        side_effect=FileNotFoundError,
    ):
        client = await aiohttp_client(app)

    # Mock the manager never being started
    del app[KEY_BAN_MANAGER]

    set_real_ip("4.3.2.1")
    resp = await client.get("/")
    assert resp.status == HTTPStatus.NOT_FOUND
    assert "IP Ban middleware loaded but banned IPs not loaded" in caplog.text


@pytest.mark.parametrize(
    ("remote_addr", "bans", "status"),
    list(
        zip(
            BANNED_IPS_WITH_SUPERVISOR,
            [1, 1, 0],
            [HTTPStatus.FORBIDDEN, HTTPStatus.FORBIDDEN, HTTPStatus.UNAUTHORIZED],
            strict=False,
        )
    ),
)
async def test_access_from_supervisor_ip(
    remote_addr,
    bans,
    status,
    hass: HomeAssistant,
    aiohttp_client: ClientSessionGenerator,
    hassio_env,
    resolution_info: AsyncMock,
) -> None:
    """Test accessing to server from supervisor IP."""
    app = web.Application()
    app[KEY_HASS] = hass

    async def unauth_handler(request):
        """Return a mock web response."""
        raise HTTPUnauthorized

    app.router.add_get("/", unauth_handler)
    setup_bans(hass, app, 1)
    mock_real_ip(app)(remote_addr)

    with patch(
        "homeassistant.components.http.ban.load_yaml_config_file",
        return_value={},
    ):
        client = await aiohttp_client(app)

    manager = app[KEY_BAN_MANAGER]

    assert await async_setup_component(hass, "hassio", {"hassio": {}})

    m_open = mock_open()

    with (
        patch.dict(os.environ, {"SUPERVISOR": SUPERVISOR_IP}),
        patch("homeassistant.components.http.ban.open", m_open, create=True),
    ):
        resp = await client.get("/")
        assert resp.status == HTTPStatus.UNAUTHORIZED
        assert len(manager.ip_bans_lookup) == bans
        assert m_open.call_count == bans

        # second request should be forbidden if banned
        resp = await client.get("/")
        assert resp.status == status
        assert len(manager.ip_bans_lookup) == bans


async def test_ban_middleware_not_loaded_by_config(hass: HomeAssistant) -> None:
    """Test accessing to server from banned IP when feature is off."""
    with patch("homeassistant.components.http.setup_bans") as mock_setup:
        await async_setup_component(
            hass, "http", {"http": {http.CONF_IP_BAN_ENABLED: False}}
        )

    assert len(mock_setup.mock_calls) == 0


async def test_ban_middleware_loaded_by_default(hass: HomeAssistant) -> None:
    """Test accessing to server from banned IP when feature is off."""
    with patch("homeassistant.components.http.setup_bans") as mock_setup:
        await async_setup_component(hass, "http", {"http": {}})

    assert len(mock_setup.mock_calls) == 1


async def test_ip_bans_file_creation(
    hass: HomeAssistant,
    aiohttp_client: ClientSessionGenerator,
    caplog: pytest.LogCaptureFixture,
) -> None:
    """Testing if banned IP file created."""
    app = web.Application()
    app[KEY_HASS] = hass

    async def unauth_handler(request):
        """Return a mock web response."""
        raise HTTPUnauthorized

    app.router.add_get("/example", unauth_handler)
    setup_bans(hass, app, 2)
    mock_real_ip(app)("200.201.202.204")

    with patch(
        "homeassistant.components.http.ban.load_yaml_config_file",
        return_value={
            banned_ip: {"banned_at": "2016-11-16T19:20:03"} for banned_ip in BANNED_IPS
        },
    ):
        client = await aiohttp_client(app)

    manager = app[KEY_BAN_MANAGER]
    m_open = mock_open()

    with patch("homeassistant.components.http.ban.open", m_open, create=True):
        resp = await client.get("/example")
        assert resp.status == HTTPStatus.UNAUTHORIZED
        assert len(manager.ip_bans_lookup) == len(BANNED_IPS)
        assert m_open.call_count == 0

        resp = await client.get("/example")
        assert resp.status == HTTPStatus.UNAUTHORIZED
        assert len(manager.ip_bans_lookup) == len(BANNED_IPS) + 1
        m_open.assert_called_once_with(
            hass.config.path(IP_BANS_FILE), "a", encoding="utf8"
        )

        resp = await client.get("/example")
        assert resp.status == HTTPStatus.FORBIDDEN
        assert m_open.call_count == 1

        notifications = async_get_persistent_notifications(hass)
        assert len(notifications) == 2
        assert (
            notifications["http-login"]["message"]
            == "Login attempt or request with invalid authentication from example.com (200.201.202.204). See the log for details."
        )

        assert (
            "Login attempt or request with invalid authentication from example.com (200.201.202.204). Requested URL: '/example'."
            in caplog.text
        )


async def test_failed_login_attempts_counter(
    hass: HomeAssistant, aiohttp_client: ClientSessionGenerator
) -> None:
    """Testing if failed login attempts counter increased."""
    app = web.Application()
    app[KEY_HASS] = hass

    async def auth_handler(request):
        """Return 200 status code."""
        return None, 200

    async def auth_true_handler(request):
        """Return 200 status code."""
        process_success_login(request)
        return None, 200

    app.router.add_get(
        "/auth_true",
        request_handler_factory(hass, Mock(requires_auth=True), auth_true_handler),
    )
    app.router.add_get(
        "/auth_false",
        request_handler_factory(hass, Mock(requires_auth=True), auth_handler),
    )
    app.router.add_get(
        "/", request_handler_factory(hass, Mock(requires_auth=False), auth_handler)
    )

    setup_bans(hass, app, 5)
    remote_ip = ip_address("200.201.202.204")
    mock_real_ip(app)("200.201.202.204")

    @middleware
    async def mock_auth(request, handler):
        """Mock auth middleware."""
        if "auth_true" in request.path:
            request[KEY_AUTHENTICATED] = True
        else:
            request[KEY_AUTHENTICATED] = False
        return await handler(request)

    app.middlewares.append(mock_auth)

    client = await aiohttp_client(app)

    resp = await client.get("/auth_false")
    assert resp.status == HTTPStatus.UNAUTHORIZED
    assert app[KEY_FAILED_LOGIN_ATTEMPTS][remote_ip] == 1

    resp = await client.get("/auth_false")
    assert resp.status == HTTPStatus.UNAUTHORIZED
    assert app[KEY_FAILED_LOGIN_ATTEMPTS][remote_ip] == 2

    resp = await client.get("/")
    assert resp.status == HTTPStatus.OK
    assert app[KEY_FAILED_LOGIN_ATTEMPTS][remote_ip] == 2

    # This used to check that with trusted networks we reset login attempts
    # We no longer support trusted networks.
    resp = await client.get("/auth_true")
    assert resp.status == HTTPStatus.OK
    assert app[KEY_FAILED_LOGIN_ATTEMPTS][remote_ip] == 0

    resp = await client.get("/auth_false")
    assert resp.status == HTTPStatus.UNAUTHORIZED
    assert app[KEY_FAILED_LOGIN_ATTEMPTS][remote_ip] == 1

    resp = await client.get("/auth_false")
    assert resp.status == HTTPStatus.UNAUTHORIZED
    assert app[KEY_FAILED_LOGIN_ATTEMPTS][remote_ip] == 2


async def test_single_ban_file_entry(
    hass: HomeAssistant,
) -> None:
    """Test that only one item is added to ban file."""
    app = web.Application()
    app[KEY_HASS] = hass

    async def unauth_handler(request):
        """Return a mock web response."""
        raise HTTPUnauthorized

    app.router.add_get("/example", unauth_handler)
    setup_bans(hass, app, 2)
    mock_real_ip(app)("200.201.202.204")

    manager = app[KEY_BAN_MANAGER]
    m_open = mock_open()

    with patch("homeassistant.components.http.ban.open", m_open, create=True):
        remote_ip = ip_address("200.201.202.204")
        await manager.async_add_ban(remote_ip)
        await manager.async_add_ban(remote_ip)

    assert m_open.call_count == 1
